1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.springframework.integration.jpa.core;
18
19 import java.util.List;
20
21 import javax.persistence.EntityManager;
22 import javax.persistence.EntityManagerFactory;
23
24 import org.springframework.beans.BeansException;
25 import org.springframework.beans.factory.BeanFactory;
26 import org.springframework.beans.factory.BeanFactoryAware;
27 import org.springframework.beans.factory.InitializingBean;
28 import org.springframework.expression.EvaluationContext;
29 import org.springframework.expression.Expression;
30 import org.springframework.expression.common.LiteralExpression;
31 import org.springframework.integration.expression.ExpressionUtils;
32 import org.springframework.integration.jpa.support.JpaParameter;
33 import org.springframework.integration.jpa.support.PersistMode;
34 import org.springframework.integration.jpa.support.parametersource.BeanPropertyParameterSourceFactory;
35 import org.springframework.integration.jpa.support.parametersource.ExpressionEvaluatingParameterSourceFactory;
36 import org.springframework.integration.jpa.support.parametersource.ParameterSource;
37 import org.springframework.integration.jpa.support.parametersource.ParameterSourceFactory;
38 import org.springframework.lang.Nullable;
39 import org.springframework.messaging.Message;
40 import org.springframework.messaging.MessagingException;
41 import org.springframework.util.Assert;
42 import org.springframework.util.CollectionUtils;
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71 public class JpaExecutor implements InitializingBean, BeanFactoryAware {
72
73 private final JpaOperations jpaOperations;
74
75 private List<JpaParameter> jpaParameters;
76
77 private Class<?> entityClass;
78
79 private String jpaQuery;
80
81 private String nativeQuery;
82
83 private String namedQuery;
84
85 private Expression maxResultsExpression;
86
87 private Expression firstResultExpression;
88
89 private Expression idExpression;
90
91 private PersistMode persistMode = PersistMode.MERGE;
92
93 private ParameterSourceFactory parameterSourceFactory = null;
94
95 private ParameterSource parameterSource;
96
97 private boolean flush = false;
98
99 private int flushSize = 0;
100
101 private boolean clearOnFlush = false;
102
103 private boolean deleteAfterPoll = false;
104
105 private boolean deleteInBatch = false;
106
107 private boolean expectSingleResult = false;
108
109
110
111
112
113
114
115 private Boolean usePayloadAsParameterSource = null;
116
117 private BeanFactory beanFactory;
118
119 private EvaluationContext evaluationContext;
120
121
122
123
124
125
126 public JpaExecutor(EntityManagerFactory entityManagerFactory) {
127 Assert.notNull(entityManagerFactory, "entityManagerFactory must not be null.");
128
129 DefaultJpaOperations defaultJpaOperations = new DefaultJpaOperations();
130 defaultJpaOperations.setEntityManagerFactory(entityManagerFactory);
131 defaultJpaOperations.afterPropertiesSet();
132
133 this.jpaOperations = defaultJpaOperations;
134 }
135
136
137
138
139
140 public JpaExecutor(EntityManager entityManager) {
141 Assert.notNull(entityManager, "entityManager must not be null.");
142
143 DefaultJpaOperations defaultJpaOperations = new DefaultJpaOperations();
144 defaultJpaOperations.setEntityManager(entityManager);
145 defaultJpaOperations.afterPropertiesSet();
146 this.jpaOperations = defaultJpaOperations;
147 }
148
149
150
151
152
153
154
155
156 public JpaExecutor(JpaOperations jpaOperations) {
157 Assert.notNull(jpaOperations, "jpaOperations must not be null.");
158 this.jpaOperations = jpaOperations;
159 }
160
161 public void setIntegrationEvaluationContext(EvaluationContext evaluationContext) {
162 this.evaluationContext = evaluationContext;
163 }
164
165
166
167
168
169
170 public void setEntityClass(Class<?> entityClass) {
171 Assert.notNull(entityClass, "entityClass must not be null.");
172 this.entityClass = entityClass;
173 }
174
175
176
177
178 public void setJpaQuery(String jpaQuery) {
179 Assert.isTrue(this.nativeQuery == null && this.namedQuery == null, "You can define only one of the "
180 + "properties 'jpaQuery', 'nativeQuery', 'namedQuery'");
181 Assert.hasText(jpaQuery, "jpaQuery must neither be null nor empty.");
182 this.jpaQuery = jpaQuery;
183 }
184
185
186
187
188
189
190
191
192 public void setNativeQuery(String nativeQuery) {
193
194 Assert.isTrue(this.namedQuery == null && this.jpaQuery == null, "You can define only one of the "
195 + "properties 'jpaQuery', 'nativeQuery', 'namedQuery'");
196 Assert.hasText(nativeQuery, "nativeQuery must neither be null nor empty.");
197
198 this.nativeQuery = nativeQuery;
199 }
200
201
202
203
204
205
206 public void setNamedQuery(String namedQuery) {
207
208 Assert.isTrue(this.jpaQuery == null && this.nativeQuery == null, "You can define only one of the "
209 + "properties 'jpaQuery', 'nativeQuery', 'namedQuery'");
210
211 Assert.hasText(namedQuery, "namedQuery must neither be null nor empty.");
212 this.namedQuery = namedQuery;
213 }
214
215 public void setPersistMode(PersistMode persistMode) {
216 this.persistMode = persistMode;
217 }
218
219 public void setJpaParameters(List<JpaParameter> jpaParameters) {
220 this.jpaParameters = jpaParameters;
221 }
222
223 public void setUsePayloadAsParameterSource(Boolean usePayloadAsParameterSource) {
224 this.usePayloadAsParameterSource = usePayloadAsParameterSource;
225 }
226
227
228
229
230
231
232
233
234 public void setFlush(boolean flush) {
235 this.flush = flush;
236 }
237
238
239
240
241
242
243
244
245
246 public void setFlushSize(int flushSize) {
247 Assert.state(flushSize >= 0, "'flushSize' cannot be less than '0'.");
248 this.flushSize = flushSize;
249 }
250
251
252
253
254
255
256
257
258
259 public void setClearOnFlush(boolean clearOnFlush) {
260 this.clearOnFlush = clearOnFlush;
261 }
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276 public void setDeleteInBatch(boolean deleteInBatch) {
277 this.deleteInBatch = deleteInBatch;
278 }
279
280
281
282
283
284
285 public void setDeleteAfterPoll(boolean deleteAfterPoll) {
286 this.deleteAfterPoll = deleteAfterPoll;
287 }
288
289
290
291
292 public void setParameterSourceFactory(ParameterSourceFactory parameterSourceFactory) {
293 Assert.notNull(parameterSourceFactory, "parameterSourceFactory must not be null.");
294 this.parameterSourceFactory = parameterSourceFactory;
295 }
296
297
298
299
300
301
302 public void setParameterSource(ParameterSource parameterSource) {
303 Assert.notNull(parameterSource, "parameterSource must not be null.");
304 this.parameterSource = parameterSource;
305 }
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321 public void setExpectSingleResult(boolean expectSingleResult) {
322 this.expectSingleResult = expectSingleResult;
323 }
324
325
326
327
328
329
330
331 public void setFirstResultExpression(Expression firstResultExpression) {
332 this.firstResultExpression = firstResultExpression;
333 }
334
335
336
337
338
339
340
341 public void setIdExpression(Expression idExpression) {
342 this.idExpression = idExpression;
343 }
344
345
346
347
348
349
350 public void setMaxResultsExpression(Expression maxResultsExpression) {
351 Assert.notNull(maxResultsExpression, "maxResultsExpression cannot be null");
352 this.maxResultsExpression = maxResultsExpression;
353 }
354
355
356
357
358
359
360
361 public void setMaxNumberOfResults(int maxNumberOfResults) {
362 this.setMaxResultsExpression(new LiteralExpression("" + maxNumberOfResults));
363 }
364
365 @Override
366 public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
367 this.beanFactory = beanFactory;
368 }
369
370
371
372
373
374 @Override
375 public void afterPropertiesSet() {
376 if (!CollectionUtils.isEmpty(this.jpaParameters)) {
377 if (this.parameterSourceFactory == null) {
378 ExpressionEvaluatingParameterSourceFactory expressionSourceFactory =
379 new ExpressionEvaluatingParameterSourceFactory(this.beanFactory);
380 expressionSourceFactory.setParameters(this.jpaParameters);
381 this.parameterSourceFactory = expressionSourceFactory;
382
383 }
384 else {
385 throw new IllegalStateException("The 'jpaParameters' and 'parameterSourceFactory' " +
386 "are mutually exclusive. Consider to configure parameters on the provided " +
387 "'parameterSourceFactory': " + this.parameterSourceFactory);
388 }
389
390 if (this.usePayloadAsParameterSource == null) {
391 this.usePayloadAsParameterSource = false;
392 }
393
394 }
395 else {
396
397 if (this.parameterSourceFactory == null) {
398 this.parameterSourceFactory = new BeanPropertyParameterSourceFactory();
399 }
400
401 if (this.usePayloadAsParameterSource == null) {
402 this.usePayloadAsParameterSource = true;
403 }
404 }
405
406 if (this.flushSize > 0) {
407 this.flush = true;
408 }
409 else if (this.flush) {
410 this.flushSize = 1;
411 }
412
413 if (this.evaluationContext == null) {
414 this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(this.beanFactory);
415 }
416 }
417
418
419
420
421
422
423
424
425
426
427
428 public Object executeOutboundJpaOperation(Message<?> message) {
429 ParameterSource paramSource = null;
430 if (this.jpaQuery != null || this.nativeQuery != null || this.namedQuery != null) {
431 paramSource = determineParameterSource(message);
432 }
433 if (this.jpaQuery != null) {
434 return this.jpaOperations.executeUpdate(this.jpaQuery, paramSource);
435 }
436 else if (this.nativeQuery != null) {
437 return this.jpaOperations.executeUpdateWithNativeQuery(this.nativeQuery, paramSource);
438 }
439 else if (this.namedQuery != null) {
440 return this.jpaOperations.executeUpdateWithNamedQuery(this.namedQuery, paramSource);
441 }
442 else {
443 return executeOutboundJpaOperationOnPersistentMode(message);
444 }
445 }
446
447 private Object executeOutboundJpaOperationOnPersistentMode(Message<?> message) {
448 Object payload = message.getPayload();
449 switch (this.persistMode) {
450 case PERSIST:
451 this.jpaOperations.persist(payload, this.flushSize, this.clearOnFlush);
452 return payload;
453 case MERGE:
454 return this.jpaOperations.merge(payload, this.flushSize, this.clearOnFlush);
455 case DELETE:
456 this.jpaOperations.delete(payload);
457 if (this.flush) {
458 this.jpaOperations.flush();
459 }
460 return payload;
461 default:
462 throw new IllegalStateException("Unsupported PersistMode: " + this.persistMode.name());
463 }
464 }
465
466
467
468
469
470 @Nullable
471 public Object poll() {
472 return poll(null);
473 }
474
475
476
477
478
479
480
481
482
483
484 @Nullable
485 public Object poll(@Nullable final Message<?> requestMessage) {
486 final Object payload;
487
488 if (this.idExpression != null) {
489 Object id = this.idExpression.getValue(this.evaluationContext, requestMessage);
490 Assert.state(id != null, "The 'idExpression' cannot evaluate to null.");
491 Class<?> entityClazz = this.entityClass;
492 if (entityClazz == null && requestMessage != null) {
493 entityClazz = requestMessage.getPayload().getClass();
494 }
495 Assert.state(entityClazz != null, "The entity class to retrieve cannot be null.");
496 payload = this.jpaOperations.find(entityClazz, id);
497 }
498 else {
499 final List<?> result;
500 int maxNumberOfResults = evaluateExpressionForNumericResult(requestMessage, this.maxResultsExpression);
501 if (requestMessage == null) {
502 result = doPoll(this.parameterSource, 0, maxNumberOfResults);
503 }
504 else {
505 int firstResult = 0;
506 if (this.firstResultExpression != null) {
507 firstResult = getFirstResult(requestMessage);
508 }
509 ParameterSource paramSource = determineParameterSource(requestMessage);
510 result = doPoll(paramSource, firstResult, maxNumberOfResults);
511 }
512
513 if (result.isEmpty()) {
514 payload = null;
515 }
516 else {
517 if (this.expectSingleResult) {
518 if (result.size() == 1) {
519 payload = result.iterator().next();
520 }
521 else if (requestMessage != null) {
522 throw new MessagingException(requestMessage,
523 "The Jpa operation returned more than 1 result for expectSingleResult mode.");
524 }
525 else {
526 throw new MessagingException(
527 "The Jpa operation returned more than 1 result for expectSingleResult mode.");
528 }
529 }
530 else {
531 payload = result;
532 }
533 }
534 }
535
536 checkDelete(payload);
537 return payload;
538 }
539
540 private void checkDelete(final Object payload) {
541 if (payload != null && this.deleteAfterPoll) {
542 if (payload instanceof Iterable) {
543 if (this.deleteInBatch) {
544 this.jpaOperations.deleteInBatch((Iterable<?>) payload);
545 }
546 else {
547 for (Object entity : (Iterable<?>) payload) {
548 this.jpaOperations.delete(entity);
549 }
550 }
551 }
552 else {
553 this.jpaOperations.delete(payload);
554 }
555
556 if (this.flush) {
557 this.jpaOperations.flush();
558 }
559 }
560 }
561
562 protected List<?> doPoll(ParameterSource jpaQLParameterSource, int firstResult, int maxNumberOfResults) {
563 List<?> payload;
564 if (this.jpaQuery != null) {
565 payload =
566 this.jpaOperations.getResultListForQuery(this.jpaQuery, jpaQLParameterSource,
567 firstResult, maxNumberOfResults);
568 }
569 else if (this.nativeQuery != null) {
570 payload =
571 this.jpaOperations.getResultListForNativeQuery(this.nativeQuery, this.entityClass,
572 jpaQLParameterSource, firstResult, maxNumberOfResults);
573 }
574 else if (this.namedQuery != null) {
575 payload =
576 this.jpaOperations.getResultListForNamedQuery(this.namedQuery, jpaQLParameterSource,
577 firstResult, maxNumberOfResults);
578 }
579 else if (this.entityClass != null) {
580 payload = this.jpaOperations.getResultListForClass(this.entityClass, firstResult, maxNumberOfResults);
581 }
582 else {
583 throw new IllegalStateException("For the polling operation, one of "
584 + "the following properties must be specified: "
585 + "query, namedQuery or entityClass.");
586 }
587 return payload;
588 }
589
590 private int getFirstResult(final Message<?> requestMessage) {
591 return evaluateExpressionForNumericResult(requestMessage, this.firstResultExpression);
592 }
593
594 private int evaluateExpressionForNumericResult(@Nullable final Message<?> requestMessage,
595 @Nullable Expression expression) {
596
597 int evaluatedResult = 0;
598 if (expression != null) {
599 Object evaluationResult = expression.getValue(this.evaluationContext, requestMessage);
600
601 if (evaluationResult != null) {
602 if (evaluationResult instanceof Number) {
603 evaluatedResult = ((Number) evaluationResult).intValue();
604 }
605 else if (evaluationResult instanceof String) {
606 try {
607 evaluatedResult = Integer.parseInt((String) evaluationResult);
608 }
609 catch (NumberFormatException e) {
610 throw new IllegalArgumentException(
611 "Value " + evaluationResult + " passed as cannot be " +
612 "parsed to a number, expected to be numeric", e);
613 }
614 }
615 else {
616 throw new IllegalArgumentException("Expected the value to be a Number got "
617 + evaluationResult.getClass().getName());
618 }
619 }
620 }
621 return evaluatedResult;
622 }
623
624 private ParameterSource determineParameterSource(final Message<?> requestMessage) {
625 if (this.usePayloadAsParameterSource) {
626 return this.parameterSourceFactory.createParameterSource(requestMessage.getPayload());
627 }
628 else {
629 return this.parameterSourceFactory.createParameterSource(requestMessage);
630 }
631 }
632
633 }